package com.kaigejava.rocketmq.maindemo.product.transaction;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author 凯哥Java
 * @description 事务消息的生产者
 * @company
 * @since 2022/10/19 14:50
 */
public class TransactionMessageProducer {
    public static void main(String[] args) throws Exception {
        //1：创建消息生产者producer,并指定生产者组名
        TransactionMQProducer producer = new TransactionMQProducer("transaction-group");
        //2：制定nameserver地址
        producer.setNamesrvAddr("192.168.50.132:9876");
        //设置发送超时时间：
        producer.setSendMsgTimeout(10000);
        //设置自定义线程池来处理这些检查请求
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        });
        producer.setExecutorService(executorService);
        // 指定事务会查的实现类
        producer.setTransactionListener(new TransactionListener() {
            private final AtomicInteger transactionIndex = new AtomicInteger(0);

            private final ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

            /**
             * 在该方方中执行本地事务
             * @param msg
             * @param arg
             * @return
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                System.out.println("发送完half消息后，执行本地事务开始。。。");
                if(StringUtils.equals("TAGA",msg.getTags())){
                    //正常提交
                    System.out.println("TAGA的消息，正常提交");
                    return LocalTransactionState.COMMIT_MESSAGE;
                }else if(StringUtils.equals("TAGB",msg.getTags())){
                    //TAGB回滚
                    System.out.println("TAGB的消息。这里需要进行回滚操作~~~~");
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
               /* int value = transactionIndex.getAndIncrement();
                System.out.println(Thread.currentThread().getName()+  "-executeLocalTransaction:" + new String(msg.getBody()) + ",value=" + value);
                int status = value % 3;
                localTrans.put(msg.getTransactionId(), status);*/
                System.out.println("TAGC的消息。设置为位置。MQ会执行事务回查的");
                return LocalTransactionState.UNKNOW;
            }

            /**
             * 该方式是MQ进行消息事务状态回查的
             * @param msg
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
               // System.out.println(Thread.currentThread().getName()+  "-checkLocalTransaction:" + new String(msg.getBody()));
                System.out.println("MQ执行事务回查方法。当前回查消息TAG:"+msg.getTags());
               /* Integer status = localTrans.get(msg.getTransactionId());
                if (null != status) {
                    switch (status) {
                        case 0:
                            return LocalTransactionState.COMMIT_MESSAGE;
                        case 1:
                            return LocalTransactionState.UNKNOW;
                        case 2:
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                }*/
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        //3：启动prodicer
        producer.start();
        //4：创建消息对象，指定主题Topic、Tag和消息体
        String [] tags = new String [] {"TAGA","TAGB","TAGC"};
        for (int i = 0; i < 3; i++) {
            Message message = new Message();
            message.setTopic("transaction-topic");
            message.setTags(tags[i]);
            message.setBody(("from transaction-main"+i).getBytes());
            //5：发送消息
            SendResult result = producer.sendMessageInTransaction(message,60000);

            SendStatus status = result.getSendStatus();
            String msgId = result.getMsgId();
            int queueId = result.getMessageQueue().getQueueId();
            String offsetMegId = result.getOffsetMsgId();
            long offset = result.getQueueOffset();
            String sendResultMsg = "同步消息第"+i+"个发送状态："+status+"\t"+"消息id:"+msgId+"\t 消费者队列id:"+queueId +"\t offsetMegId:"+offsetMegId+"\t offset:"+offset;
            System.out.println(sendResultMsg);
        }
        //6：关闭生产者producer
        //producer.shutdown();
    }
}
